package com.kvn.dal.core.retry.beforehand;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.annotation.Resource;

import org.apache.commons.lang3.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import com.alibaba.fastjson.JSON;
import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.kvn.dal.core.TimedTask;
import com.kvn.dal.core.dao.IJobBeforehandRetryDao;
import com.kvn.dal.core.pojo.JobBeforehandRetry;
import com.kvn.dal.core.pojo.enums.JobBeforehandRetryStatus;
import com.kvn.dal.core.retry.RetryParam;
import com.kvn.dal.core.task.ExecutableTask;
import com.kvn.dal.log.Log;

/**
 * 事前重试分发调度
* @author wzy
* @date 2017年6月23日 下午5:30:49
*/
@TimedTask(cron = "${beforehandRetry.cron}", desc = "dal-job事前重试分发调度")
public class BeforehandRetryDispatchJob implements ExecutableTask {
	private static final Logger logger = LoggerFactory.getLogger(BeforehandRetryDispatchJob.class);
	
	/**
	 * 业务最大执行时间，默认为5min。超过5min，则认为业务执行超时。可能是DB挂了，没有来得及更新重试记录。<br/>
	 */
	private long bizMaxExecuteTime = TimeUnit.SECONDS.toSeconds(300);
	/**
	 * 指定是否对超时记录进行重试，默认false
	 */
	private boolean retryBizTimeout  = false;
	
	@Resource
	private IJobBeforehandRetryDao jobBeforehandRetryDao;
	@Resource
	private ApplicationContext applicationContext;

	@Override
	public void execute(JobExecutionContext context) throws JobExecutionException {
		logger.info("RetryBeforehandDispatchJob --> retry start.....");
		// 1. 取重试数据依次分发。按时间正排序
		List<JobBeforehandRetry> beforehandRetryLs = jobBeforehandRetryDao.getNeedRetryList();
		
		// 2. 添加业务超时的重试数据
		if(retryBizTimeout){
			List<JobBeforehandRetry> bizTimeoutLs = jobBeforehandRetryDao.getBizTimeOutList(bizMaxExecuteTime);
			if(!CollectionUtils.isEmpty(bizTimeoutLs)){
				for(JobBeforehandRetry retry : bizTimeoutLs){
					// 设置临时状态
					retry.setStatus(JobBeforehandRetryStatus.BIZ_INIT_TEMP);
//					retry.setMaskStatus(JobBeforehandRetryStatus.BIZ_INIT_TEMP);
				}
				beforehandRetryLs.addAll(bizTimeoutLs);
				logger.info("RetryBeforehandDispatchJob --> 添加业务超时重试记录[{}]条", bizTimeoutLs.size());
			}
		}
		
		if(CollectionUtils.isEmpty(beforehandRetryLs)){
			logger.info("RetryBeforehandDispatchJob --> 没有需要重试的记录....exit....");
			return;
		}
		
		// 3. 进行重试分发
		long start = System.currentTimeMillis();
		for(JobBeforehandRetry beforehandRetry : beforehandRetryLs){
			long startInner = System.currentTimeMillis();
			try {
				doDispatch(beforehandRetry);
			} catch (Exception e) {
				// 重试记录之间互不影响，直到没有待重试的记录为止
			}
			long endInner = System.currentTimeMillis();
			logger.info("RetryBeforehandDispatchJob --> 重试[retryId:{}, class:{}]结束，总共耗时[{}ms].......", beforehandRetry.getId(), beforehandRetry.getRetryClassName(), (endInner - startInner));
		}
		long end = System.currentTimeMillis();
		
		logger.info("RetryBeforehandDispatchJob --> 本次重试结束，总共耗时[{}ms].......", (end - start));
	}

	private void handleRetryResult(JobBeforehandRetry jobBeforehandRetry, BeforehandRetry brAnno, Throwable catchedException) {
		final String OP = "RetryBeforehandDispatchJob#handleRetryExceptionResult";
		if(catchedException == null){ // 业务执行正常
			int count = jobBeforehandRetryDao.updateStatus(jobBeforehandRetry.getId(), JobBeforehandRetryStatus.RETRY_SUCCESS, jobBeforehandRetry.getStatus(), "重试业务成功[end]", null);
			logger.info(Log.op(OP).msg("业务执行成功，结束").kv("retryId", jobBeforehandRetry.getId()).kv("count", count).toString());
			return;
		}
		String errMsg = Strings.nullToEmpty(catchedException.getMessage());
		errMsg = errMsg.length() > 120 ? errMsg.substring(0, 120) : errMsg;
		if(needRetry(brAnno, catchedException)){// 需要重试，更新调用记录信息
			String flowPath = jobBeforehandRetry.getStatus() == JobBeforehandRetryStatus.BIZ_INIT ? "重试初始->" : "重试失败->";
			int count = jobBeforehandRetryDao.updateStatus(jobBeforehandRetry.getId(), JobBeforehandRetryStatus.RETRY_INIT, jobBeforehandRetry.getStatus(), flowPath, errMsg);
			logger.info(Log.op(OP).msg("业务执行失败，等待下次重试....").kv("retryId", jobBeforehandRetry.getId()).kv("count", count).toString(), catchedException);
		} else {// 不需要重试，更新调用记录信息
			int count = jobBeforehandRetryDao.updateStatus(jobBeforehandRetry.getId(), JobBeforehandRetryStatus.BIZ_FAIL, jobBeforehandRetry.getStatus(), "业务失败（不可重试的异常）[end]", errMsg);
			logger.info(Log.op(OP).msg("业务执行失败，等待下次重试....").kv("retryId", jobBeforehandRetry.getId()).kv("count", count).toString(), catchedException);
		}
		
	}
	
	private boolean needRetry(BeforehandRetry brAnno, Throwable catchedException) {
		for(Class<?> expClass : brAnno.retryFor()){
			if(catchedException.getClass().isAssignableFrom(expClass)){
				return true;
			}
		}
		return false;
	}

	/**
	 * 分发到对应的重试方法去执行
	 * @param beforehandRetry
	 */
	private synchronized void doDispatch(JobBeforehandRetry beforehandRetry) throws Exception {
		String className = beforehandRetry.getRetryClassName();
		Class<?> retryClass = null;
		
		try {
			retryClass = Class.forName(className);
		} catch (ClassNotFoundException e) {
			logger.info("RetryBeforehandDispatchJob --> 找不到重试的class=[{}]", className, e);
			throw e;
		}
		
		Object retryClassBean = applicationContext.getBean(retryClass);
		BeforehandRetryContext retryContext = buildRetryContext(beforehandRetry);
		Throwable catchedException = null;
		BeforehandRetry brAnno = null;
		try {
			String retryMethod = beforehandRetry.getRetryMethod();
			Class<?>[] retryMethodParamTypes = getRetryMethodParamTypes(retryContext);
			Method method = retryClassBean.getClass().getMethod(retryMethod, retryMethodParamTypes);
			brAnno = getBeforehandRetryAnnotation(retryClassBean, retryMethod, retryMethodParamTypes);
			Assert.notNull(brAnno, "方法[" + className + "#" + retryMethod + "]上的重试注解@BeforehandRetry不能为空");
			ThreadContext.getContext().setRetryThread(true); // 设计重试线程标记
			Object[] args = null;
			if(retryContext.getRetryParamValueMap() != null && retryContext.getRetryParamValueMap().size() >= 1){
				args = retryContext.getRetryParamValueMap().values().toArray();
			}
			method.invoke(retryClassBean, args);
		} catch (Exception e) {
			logger.error(Log.op("RetryBeforehandDispatchJob#doRetry").msg("重试异常").kv("retryId", beforehandRetry.getId()).toString(), e);
			// spring 代理执行成功后，如果方法本身抛出异常则返回的是InvocationTargetException
			catchedException = e instanceof InvocationTargetException ? e.getCause() : e;
			throw e;
		} finally {
			handleRetryResult(beforehandRetry, brAnno, catchedException);
			ThreadContext.clear(); // 重试执行完成，清除标记
		}
		
	}
	
	/**
	 * 获取原始类中retryMethod上的注解。spring的代理类的method拿不到@BeforehandRetry注解
	 * @param retryClassBean spring的代理类
	 * @param retryMethod
	 * @param retryMethodParamTypes
	 * @return
	 */
	private BeforehandRetry getBeforehandRetryAnnotation(Object retryClassBean, String retryMethod, Class<?>[] retryMethodParamTypes) throws Exception {
		Object originInstance = unwrapProxy(retryClassBean);
		Method originMethod = originInstance.getClass().getMethod(retryMethod, retryMethodParamTypes);
		return originMethod.getAnnotation(BeforehandRetry.class);
	}

	/**
	 * 取 spring aop 被代理的原始对象
	 */
	protected Object unwrapProxy(Object bean) throws Exception{

		if (AopUtils.isAopProxy(bean) && bean instanceof Advised) {
			Advised advised = (Advised) bean;
			try {
				bean = advised.getTargetSource().getTarget();
			} catch (Exception e) {
				throw e;
			}
		}
		return bean;
	}

	private Class<?>[] getRetryMethodParamTypes(BeforehandRetryContext retryContext) {
		List<RetryParam> retryParamLs = retryContext.getRetryParamLs();
		if(CollectionUtils.isEmpty(retryParamLs)){
			return null;
		}
		
		Class<?>[] paramTypes = new Class<?>[retryParamLs.size()];
		for(int i=0; i<retryParamLs.size(); i++){
			paramTypes[i] = retryParamLs.get(i).getParamClass();
		}
		return paramTypes;
	}

	private BeforehandRetryContext buildRetryContext(JobBeforehandRetry jobBeforehandRetry) {
		BeforehandRetryContext context = new BeforehandRetryContext();
		context.setJobRetryRecord(jobBeforehandRetry);
		String retryParams = jobBeforehandRetry.getRetryParams();
		if(StringUtils.isNotBlank(retryParams)){
			List<RetryParam> retryParamLs = JSON.parseArray(retryParams, RetryParam.class);
			context.setRetryParamLs(retryParamLs);
			
			ImmutableListMultimap<Class<?>, RetryParam> retryMap = Multimaps.index(retryParamLs.iterator(), new Function<RetryParam, Class<?>>() {
				@Override
				public Class<?> apply(RetryParam input) {
					return input.getParamClass();
				}
			});
			
			ListMultimap<Class<?>, Object> retryParamValueMap = Multimaps.transformEntries(retryMap, new Maps.EntryTransformer<Class<?>, RetryParam, Object>() {
				@Override
				public Object transformEntry(Class<?> key, RetryParam value) {
					return value.retoreParam(key);
				}
			});
			
			context.setRetryParamValueMap(retryParamValueMap);
		}
		return context;
	}

	public long getBizMaxExecuteTime() {
		return bizMaxExecuteTime;
	}

	public void setBizMaxExecuteTime(long bizMaxExecuteTime) {
		this.bizMaxExecuteTime = bizMaxExecuteTime;
	}

	public boolean isRetryBizTimeout() {
		return retryBizTimeout;
	}

	public void setRetryBizTimeout(boolean retryBizTimeout) {
		this.retryBizTimeout = retryBizTimeout;
	}

}
